Author : Mouad HADJI, January 6th, 2018.
In this project, my main objective is to take a video feed from an onboard camera and identify the lane lines and the curvature of the road.
For this purpose, I will aim at achieving the following steps :
import os
import sys
import glob
import pickle
import cv2
import pdb
import random
from scipy import ndimage
from tqdm import tqdm
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
from collections import deque
from moviepy.editor import VideoFileClip
%matplotlib inline
We define here all the useful paths that we will use to import or export data during this project
project_path = os.path.join(os.getcwd())
test_path = os.path.join(project_path, "test_images")
camera_path = os.path.join(project_path, "camera_cal")
output_path = os.path.join(project_path, "output_images")
test_images_name = [os.path.join(test_path, imname) for imname in os.listdir(test_path) if 'test' in imname]
stght_images_name = [os.path.join(test_path, imname) for imname in os.listdir(test_path) if 'straight_line' in imname]
test_images = [cv2.imread(full_imname) for full_imname in test_images_name]
stght_images = [cv2.imread(full_imname) for full_imname in stght_images_name]
test_images_dis = [cv2.cvtColor(image, cv2.COLOR_BGR2RGB) for image in test_images]
stght_images_dis = [cv2.cvtColor(image, cv2.COLOR_BGR2RGB) for image in stght_images]
def show_images(images, n_row=1, n_col=2, figsize=(15, 10), cmap=None, save=False, filename=''):
fig, ax = plt.subplots(n_row, n_col, figsize=figsize)
n_images = n_row*n_col
images = images[:n_images]
for i, image in enumerate(images):
plt.subplot(n_row, n_col, i+1)
plt.imshow(image) if cmap is None else plt.imshow(image, cmap=cmap)
plt.xticks([])
plt.yticks([])
if save:
plt.savefig(os.path.join(output_path, filename + '.png'))
plt.tight_layout(pad=0, h_pad=0, w_pad=0)
plt.show()
def compare_images(two_images, two_labels):
fig, axes = plt.subplots(ncols=2, figsize=(20, 10))
axes[0].imshow(two_images[0])
axes[0].set_title(two_labels[0])
axes[1].imshow(two_images[1])
axes[1].set_title(two_labels[1])
show_images(test_images_dis)
show_images(stght_images_dis)
class CameraCalibration():
def __init__(self, camera_path):
self.camera_path = camera_path
self.nx = 9
self.ny = 6
self.objpoints = [] # 3D points in real space
self.imgpoints = [] # 2D points in image picture
self.objp = np.zeros((self.ny*self.nx,3), np.float32) # Prepare object points
self.objp[:,:2] = np.mgrid[0:self.nx,0:self.ny].T.reshape(-1,2) # x and y cordinates.
self.old_images = []
self.new_images = []
# calibrate
self.run()
def load_images(self):
'''
Load calibration images from class camera path
'''
camera_cal_list = [os.path.join(self.camera_path, imname) for imname in os.listdir(self.camera_path)]
camera_cal_img = [cv2.imread(full_imname) for full_imname in camera_cal_list]
return camera_cal_img
def find_corners(self, img):
'''
Inputs
img: input chessboard image
---
Returns
matrix of corners if there were corners
'''
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
ret, corners = cv2.findChessboardCorners(gray, (self.nx, self.ny), None)
if ret == True:
return corners
def run(self):
'''
Load images, find corners and store them for calibration images
'''
# load all images
camera_cal_img = self.load_images()
# find corners in images
camera_cal_corners = [self.find_corners(img) for img in camera_cal_img]
for img, corners in zip(camera_cal_img, camera_cal_corners):
if corners is not None:
self.imgpoints.append(corners)
self.objpoints.append(self.objp)
new_image = cv2.drawChessboardCorners(img.copy(), (9,6), corners, True)
self.old_images.append(img)
self.new_images.append(new_image)
## camera calibration given all object points and image points
img_shape = self.old_images[0].shape
self.ret, self.mtx, self.dist, self.rvecs, self.tvecs = cv2.calibrateCamera(self.objpoints, self.imgpoints, img_shape[0:2], None, None)
# undistorted images
self.undist_images = [self.undistort(img) for img in self.old_images]
def undistort(self, img):
'''
Inputs
img: distorted image
---
Returns
undistorted image using class calibration attributes
'''
undist = cv2.undistort(img, self.mtx, self.dist, None, self.mtx)
return undist
CC = CameraCalibration(camera_path)
show_images(CC.old_images)
show_images(CC.new_images)
compare_images([CC.old_images[10], CC.new_images[10]], ['Original', 'With points'])
compare_images([CC.old_images[10], CC.undist_images[10]], ['Original', 'Undistorted'])
test_images_undist = [CC.undistort(image) for image in test_images_dis]
stght_images_undist = [CC.undistort(image) for image in stght_images_dis]
compare_images([test_images_dis[0], test_images_undist[0]], ['Original', 'Undistorted'])
compare_images([stght_images_dis[0], stght_images_undist[0]], ['Original', 'Undistorted'])
sample_images = test_images_dis + stght_images_dis
sample_images_undist = test_images_undist + stght_images_undist
class ImageProcessing():
def extract_s_channel(self, img):
# Convert to HLS color space and separate the S channel
# Note: img is the undistorted image
hls = cv2.cvtColor(img, cv2.COLOR_RGB2HLS)
s_channel = hls[:,:,2]
return s_channel
# Grayscale image
def gray_scale(self, img):
return cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
def color_thresh(self, img, color_thresh_input=(170, 255)):
## convert to HLS color space and separate the S channel
hls_image = cv2.cvtColor(img, cv2.COLOR_BGR2HLS).astype(np.float)
l_channel = hls_image[:,:,1]
s_channel = hls_image[:,:,2]
## convert to the HSV color space
hsv_image = cv2.cvtColor(img, cv2.COLOR_BGR2HSV)
## selecting colors yellow and white
white_color = cv2.inRange(hls_image, np.uint8([10,200,0]), np.uint8([255,255,255]))
yellow_color = cv2.inRange(hsv_image, np.uint8([15,60,130]), np.uint8([150,255,255]))
## combine yellow and white masks
combined_color_images = cv2.bitwise_or(white_color, yellow_color)
## threshold color channel
s_binary = np.zeros_like(s_channel)
s_binary[(s_channel >= color_thresh_input[0]) & (s_channel <= color_thresh_input[1])] = 1
## threshold color channel
l_binary = np.zeros_like(l_channel)
l_binary[(l_channel >= 100) & (l_channel <= color_thresh_input[1])] = 1
## combined binaries
combined_binary = np.zeros_like(s_channel)
combined_binary[((s_binary > 0) & (l_binary > 0)) | (combined_color_images > 0)] = 1
return combined_binary
# use sobel thresholding to image
# Note: img is a grayscaled image
def abs_sobel_thresh(self, img, orient='x', sobel_kernel=3, sobel_thresh_input=(0,255)):
thresh_min, thresh_max = sobel_thresh_input
## convert to grayscale
gray = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
## take the derivative in x or y given orient = 'x' or 'y'
if orient == 'x':
sobel = cv2.Sobel(gray, cv2.CV_64F, 1, 0, ksize=sobel_kernel)
elif orient == 'y':
sobel = cv2.Sobel(gray, cv2.CV_64F, 0, 1, ksize=sobel_kernel)
## take the absolute value of the derivative or gradient
abs_sobel = np.abs(sobel)
# scale to 8-bit (0 - 255) then convert to type = np.uint8
scaled_sobel = np.uint8(255*abs_sobel/np.max(abs_sobel))
# create a mask of 1's where the scaled gradient magnitude is > thresh_min and < thresh_max
binary_output = np.zeros_like(scaled_sobel)
binary_output[(scaled_sobel >= thresh_min) & (scaled_sobel <= thresh_max)] = 1
return binary_output
## magnitude of gradient thresholding
def magnitude_thresh(self, img, sobel_kernel=3, mag_thresh_input=(0, 255)):
## convert to grayscale
gray = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
# take the gradient in x and y separately
sobelx = cv2.Sobel(gray, cv2.CV_64F, 1, 0, ksize=sobel_kernel)
sobely = cv2.Sobel(gray, cv2.CV_64F, 0, 1, ksize=sobel_kernel)
# calculate the magnitude
sobelxy_mag = np.sqrt(sobelx**2 + sobely**2)
# scale to 8-bit (0 - 255) and convert to type = np.uint8
sobelxy_scaled = np.uint8(255*sobelxy_mag/np.max(sobelxy_mag))
# create a binary mask where mag thresholds are met
binary_output = np.zeros_like(sobelxy_scaled)
binary_output[(sobelxy_scaled >= mag_thresh_input[0]) & (sobelxy_scaled <= mag_thresh_input[1])] = 1
return binary_output
## direction of gradient thresholding
def direction_thresh(self, img, sobel_kernel=3, dir_thresh_input=(0, np.pi/2)):
## convert to grayscale
gray = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
# take the gradient in x and y separately
sobelx = cv2.Sobel(gray, cv2.CV_64F, 1, 0, ksize=sobel_kernel)
sobely = cv2.Sobel(gray, cv2.CV_64F, 0, 1, ksize=sobel_kernel)
# take the absolute value of the x and y gradients
abs_sobelx = np.abs(sobelx)
abs_sobely = np.abs(sobely)
# use np.arctan2(abs_sobely, abs_sobelx) to calculate the direction of the gradient
dir_grad = np.arctan2(abs_sobely, abs_sobelx)
# create a binary mask where direction thresholds are met
binary_output = np.zeros_like(dir_grad)
binary_output[(dir_grad >= dir_thresh_input[0]) & (dir_grad <= dir_thresh_input[1])] = 1
return binary_output
## combine gradient thresholds
def combine_grad_thresh(self, img, sobel_kernel=3,
sobel_thresh_input=(0,255),
mag_thresh_input=(0,255),
dir_thresh_input=(0, np.pi/2),
mag_and_dir_addon=True
):
gradx = self.abs_sobel_thresh(img, orient='x', sobel_kernel=sobel_kernel, sobel_thresh_input=sobel_thresh_input)
grady = self.abs_sobel_thresh(img, orient='y', sobel_kernel=sobel_kernel, sobel_thresh_input=sobel_thresh_input)
combined = np.zeros_like(gradx)
if mag_and_dir_addon:
mag_binary = self.magnitude_thresh(img, sobel_kernel=sobel_kernel, mag_thresh_input=mag_thresh_input)
dir_binary = self.direction_thresh(img, sobel_kernel=sobel_kernel, dir_thresh_input=dir_thresh_input)
combined[((gradx == 1) & (grady == 1)) | ((mag_binary == 1) & (dir_binary == 1))] = 1
else:
combined[((gradx == 1) & (grady == 1))] = 1
return combined
def combine_grad_color(self, img,
sobel_kernel=3,
sobel_thresh_input=(10,160),
mag_thresh_input=(30, 100),
dir_thresh_input=(0.7, 1.3),
color_thresh_input=(170, 255),
mag_and_dir_addon=True):
img = cv2.fastNlMeansDenoisingColored(img,7,13,21,5)
grad_binary = self.combine_grad_thresh(img,
sobel_kernel=sobel_kernel, sobel_thresh_input=sobel_thresh_input,
mag_thresh_input=mag_thresh_input, dir_thresh_input=dir_thresh_input,
mag_and_dir_addon=mag_and_dir_addon)
color_binary = self.color_thresh(img, color_thresh_input=color_thresh_input)
grad_color_binary = np.zeros_like(color_binary)
grad_color_binary[(color_binary == 1) | (grad_binary == 1)] = 1
return grad_color_binary
def final_color_filter(self, img, color_thresh_input=(170, 255)):
img = cv2.fastNlMeansDenoisingColored(img,7,13,21,5)
binary_output = self.color_thresh(img, color_thresh_input)
return binary_output
def select_yellow(self, img):
hsv = cv2.cvtColor(img, cv2.COLOR_RGB2HSV)
lower = np.array([20,60,60])
upper = np.array([38,174, 250])
mask = cv2.inRange(hsv, lower, upper)
return mask
def select_white(self, img):
lower = np.array([202,202,202])
upper = np.array([255,255,255])
mask = cv2.inRange(img, lower, upper)
return mask
def final_thresh(self, img):
yellow = self.select_yellow(img)
white = self.select_white(img)
combined_binary = np.zeros_like(yellow)
combined_binary[(yellow >= 1) | (white >= 1)] = 1
return combined_binary
## transform image
def perspective_transform(self, img, src_points, dst_points):
## define image shape
img_size = (img.shape[1], img.shape[0])
## define source and destination points
src = np.array(src_points, np.float32)
dst = np.array(dst_points, np.float32)
## perform transformation
self.M = cv2.getPerspectiveTransform(src, dst)
self.Minv = cv2.getPerspectiveTransform(dst, src)
warped = cv2.warpPerspective(img, self.M, img_size, flags=cv2.INTER_LINEAR)
return warped, self.M, self.Minv
IP = ImageProcessing()
sample_s_channels = [IP.extract_s_channel(img) for img in sample_images_undist]
show_images(sample_s_channels, n_row=2, n_col=3)
gray_images = [IP.gray_scale(img) for img in sample_images_undist]
show_images(gray_images, n_row=2, n_col=3, cmap='gray')
#### abs_sobel_thresh
sample_sobel_images = [IP.abs_sobel_thresh(img, orient='x', sobel_thresh_input=(20, 100)) for img in sample_images_undist]
show_images(sample_sobel_images, n_row=2, n_col=3, cmap='gray')
sample_sobel_images = [IP.abs_sobel_thresh(img, orient='y', sobel_thresh_input=(20, 100)) for img in sample_images_undist]
show_images(sample_sobel_images, n_row=2, n_col=3, cmap='gray')
#### mag_thresh
sample_mag_images = [IP.magnitude_thresh(img, sobel_kernel=3, mag_thresh_input=(30, 100)) for img in sample_images_undist]
show_images(sample_mag_images, n_row=2, n_col=3, cmap='gray')
#### dir_threshold
sample_dir_images = [IP.direction_thresh(img, sobel_kernel=15, dir_thresh_input=(0.7, 1.3)) for img in sample_images_undist]
show_images(sample_dir_images, n_row=2, n_col=3, cmap='gray')
#### combining
sample_comb_images = [IP.combine_grad_thresh(img,
sobel_kernel=15,
sobel_thresh_input=(20,100),
mag_thresh_input=(30, 100),
dir_thresh_input=(0.7, 1.3),
mag_and_dir_addon=False) for img in sample_images_undist]
show_images(sample_comb_images, n_row=2, n_col=3, cmap='gray')
#### color threshold
sample_col_images = [IP.color_thresh(img, color_thresh_input=(170, 255)) for img in sample_images_undist]
show_images(sample_col_images, n_row=2, n_col=3, cmap='gray')
sample_combined_images = [IP.combine_grad_color(img) for img in sample_images_undist]
show_images(sample_combined_images, n_row=2, n_col=3, cmap='gray')
sample_combined_images = [IP.final_color_filter(img) for img in sample_images_undist]
show_images(sample_combined_images, n_row=2, n_col=3, cmap='gray')
sample_combined_images = [IP.final_thresh(img) for img in sample_images_undist]
show_images(sample_combined_images, n_row=2, n_col=3, cmap='gray')
original = sample_images[0]
undist = sample_images_undist[0]
maxY, maxX, _ = original.shape
src_calc = [[585, 455],[705, 455],[1130, 720],[190, 720]]
dst_calc = [[200,0],[maxX - 200, 0],[maxX - 200, maxY],[200, maxY]]
print(src_calc)
print(dst_calc)
src_manual = [[205, 720], [1120, 720], [745, 480], [550, 480]]
dst_manual = [[205, 720], [1120, 720], [1120, 0], [205, 0]]
src_points = src_manual
dst_points = dst_manual
## transform image
def perspective_transform(img, src_points, dst_points):
## define image shape
img_size = (img.shape[1], img.shape[0])
## define source and destination points
src = np.array(src_points, np.float32)
dst = np.array(dst_points, np.float32)
## perform transformation
M = cv2.getPerspectiveTransform(src, dst)
Minv = cv2.getPerspectiveTransform(dst, src)
warped = cv2.warpPerspective(img, M, img_size, flags=cv2.INTER_LINEAR)
return warped, M, Minv
for idx in range(len(sample_images_undist)):
original = sample_images_undist[idx]
warped, M, Minv = perspective_transform(original, src_points, dst_points)
compare_images([original, warped], ['Original', 'Birds-eye view'])
pipeline_test = [perspective_transform(img, src_points, dst_points)[0] for img in sample_combined_images]
show_images(pipeline_test, n_row=2, n_col=4, cmap='gray')
histograms = [np.sum(binary_warped[binary_warped.shape[0]//2:,:], axis=0) for binary_warped in pipeline_test]
def show_plots(data_pts, n_row=2, n_col=4, figsize=(15, 6), title_name='Histogram', save=False, filename=''):
fig, ax = plt.subplots(n_row, n_col, figsize=figsize)
n_pts = n_row*n_col
data_pts = data_pts[:n_pts]
for i, data_pt in enumerate(data_pts):
plt.subplot(n_row, n_col, i+1)
plt.plot(data_pt)
plt.title(title_name, fontsize=10)
if save:
plt.savefig(os.path.join(output_path, filename + '.png'))
plt.tight_layout(pad=0, h_pad=0, w_pad=0)
plt.show()
show_plots(histograms)
class LaneDetection():
def __init__(self, ImageProcessing, new_src_points=None, new_dst_points=None, new_yRange=None):
self.IP = ImageProcessing
self.ym_per_pix = 11/720 # meters per pixel in y dimension = [estimate of real length of lane] / [y length in pixels of lane]
self.xm_per_pix = 3.7/1100 # meters per pixel in x dimension = [estimate of real width of lane] / [x width in pixels of lane]
self.src_points = np.array([[205, 720], [1120, 720], [745, 480], [550, 480]], np.float32) if new_src_points is None else np.array(new_src_points, np.float32) # source points for the perspective transform
self.dst_points = np.array([[205, 720], [1120, 720], [1120, 0], [205, 0]], np.float32) if new_dst_points is None else np.array(new_dst_points, np.float32) # destination points for the perspective transform
self.M = cv2.getPerspectiveTransform(self.src_points, self.dst_points)
self.Minv = cv2.getPerspectiveTransform(self.dst_points, self.src_points)
self.yRange = 719 if new_yRange is None else new_yRange # y range where curvature is measured
def detect_lanes(self, img, nwindows = 9, margin=110, minpix=50):
"""
Find the polynomial representation of the lines in the `image` using:
- `nwindows` as the number of windows.
- `margin` as the windows margin.
- `minpix` as minimum number of pixes found to recenter the window.
"""
# Make a binary and transform image
processed_img = self.IP.final_thresh(img)
binary_warped = self.IP.perspective_transform(processed_img, self.src_points, self.dst_points)[0]
# Take a histogram of the bottom half of the image
histogram = np.sum(binary_warped[binary_warped.shape[0]//2:,:], axis=0)
# Create an output image to draw on and visualize the result
out_img = np.dstack((binary_warped, binary_warped, binary_warped))*255
# Find the peak of the left and right halves of the histogram
# These will be the starting point for the left and right lines
midpoint = np.int(histogram.shape[0]/2)
leftx_base = np.argmax(histogram[:midpoint])
rightx_base = np.argmax(histogram[midpoint:]) + midpoint
# Set height of windows
window_height = np.int(binary_warped.shape[0]/nwindows)
# Identify the x and y positions of all nonzero pixels in the image
nonzero = binary_warped.nonzero()
nonzeroy = np.array(nonzero[0])
nonzerox = np.array(nonzero[1])
# Current positions to be updated for each window
leftx_current = leftx_base
rightx_current = rightx_base
# Create empty lists to receive left and right lane pixel indices
left_lane_inds = []
right_lane_inds = []
# Step through the windows one by one
for window in range(nwindows):
# Identify window boundaries in x and y (and right and left)
win_y_low = binary_warped.shape[0] - (window+1)*window_height
win_y_high = binary_warped.shape[0] - window*window_height
win_xleft_low = leftx_current - margin
win_xleft_high = leftx_current + margin
win_xright_low = rightx_current - margin
win_xright_high = rightx_current + margin
# Draw the windows on the visualization image
cv2.rectangle(out_img,(win_xleft_low,win_y_low),(win_xleft_high,win_y_high),(0,255,0), 2)
cv2.rectangle(out_img,(win_xright_low,win_y_low),(win_xright_high,win_y_high),(0,255,0), 2)
# Identify the nonzero pixels in x and y within the window
good_left_inds = ((nonzeroy >= win_y_low) & (nonzeroy < win_y_high) & (nonzerox >= win_xleft_low) & (nonzerox < win_xleft_high)).nonzero()[0]
good_right_inds = ((nonzeroy >= win_y_low) & (nonzeroy < win_y_high) & (nonzerox >= win_xright_low) & (nonzerox < win_xright_high)).nonzero()[0]
# Append these indices to the lists
left_lane_inds.append(good_left_inds)
right_lane_inds.append(good_right_inds)
# If you found > minpix pixels, recenter next window on their mean position
if len(good_left_inds) > minpix:
leftx_current = np.int(np.mean(nonzerox[good_left_inds]))
if len(good_right_inds) > minpix:
rightx_current = np.int(np.mean(nonzerox[good_right_inds]))
# Concatenate the arrays of indices
left_lane_inds = np.concatenate(left_lane_inds)
right_lane_inds = np.concatenate(right_lane_inds)
# Extract left and right line pixel positions
leftx = nonzerox[left_lane_inds]
lefty = nonzeroy[left_lane_inds]
rightx = nonzerox[right_lane_inds]
righty = nonzeroy[right_lane_inds]
# Fit a second order polynomial to each and fit a new second order polynomial in world space
try:
left_fit = np.polyfit(lefty, leftx, 2)
left_fit_curve = np.polyfit(lefty*self.ym_per_pix, leftx*self.xm_per_pix, 2)
except:
left_fit = None
left_fit_curve = None
try:
right_fit = np.polyfit(righty, rightx, 2)
right_fit_curve = np.polyfit(righty*self.ym_per_pix, rightx*self.xm_per_pix, 2)
except:
right_fit = None
right_fit_curve = None
return left_fit, right_fit, left_fit_curve, right_fit_curve, left_lane_inds, right_lane_inds, out_img, nonzerox, nonzeroy
def plot_lanes(self, img, ax):
"""
Visualize the windows and fitted lines for `image`.
Returns (`left_fit` and `right_fit`)
"""
left_fit, right_fit, left_fit_curve, right_fit_curve, left_lane_inds, right_lane_inds, out_img, nonzerox, nonzeroy = self.detect_lanes(img)
# Visualization
ploty = np.linspace(0, img.shape[0]-1, img.shape[0] )
left_fitx = left_fit[0]*ploty**2 + left_fit[1]*ploty + left_fit[2]
right_fitx = right_fit[0]*ploty**2 + right_fit[1]*ploty + right_fit[2]
out_img[nonzeroy[left_lane_inds], nonzerox[left_lane_inds]] = [255, 0, 0]
out_img[nonzeroy[right_lane_inds], nonzerox[right_lane_inds]] = [0, 0, 255]
ax.imshow(out_img)
ax.plot(left_fitx, ploty, color='yellow')
ax.plot(right_fitx, ploty, color='yellow')
return left_fit, right_fit, left_fit_curve, right_fit_curve
def show_lanes(self, images, cols = 2, rows = 4, figsize=(15,13)):
"""
Display `images` on a [`cols`, `rows`] subplot grid.
Returns a collection with the image paths and the left and right polynomials.
"""
fig, axes = plt.subplots(rows, cols, figsize=figsize)
indexes = range(cols * rows)
self.fits = []
self.fits_curve = []
for ax, idx in zip(axes.flat, indexes):
if idx < len(images):
image = images[idx]
left_fit, right_fit, left_fit_curve, right_fit_curve = self.plot_lanes(image, ax)
self.fits.append((left_fit, right_fit))
self.fits_curve.append((left_fit_curve, right_fit_curve))
def get_curvature(self, yRange, side_fit_curve):
"""
Returns the in meters curvature of the polynomial `fit` on the y range `yRange`.
"""
try:
return ((1 + (2*side_fit_curve[0]*yRange*self.ym_per_pix + side_fit_curve[1])**2)**1.5) / np.absolute(2*side_fit_curve[0])
except:
return None
LD = LaneDetection(IP)
LD.show_lanes(sample_images_undist)
yRange = 719
for fit_curve in LD.fits_curve:
left_fit_curve, right_fit_curve = fit_curve
left_curvature = LD.get_curvature(yRange, left_fit_curve) / 1000
right_curvature = LD.get_curvature(yRange, right_fit_curve) / 1000
print('Left : {:.2f} km, Right : {:.2f} km'.format(left_curvature, right_curvature))
class Drawer():
def __init__(self, LaneDetection):
self.LD = LaneDetection
def draw_line(self, img, left_fit, right_fit):
'''
Inputs
img: input image
left_fit: polynomial of left lane line
right_fit: polynomial of right lane line
---
Returns
Draw the lane lines on the image `img` using the poly `left_fit` and `right_fit`
'''
yMax = img.shape[0]
ploty = np.linspace(0, yMax - 1, yMax)
color_warp = np.zeros_like(img).astype(np.uint8)
# Calculate points.
left_fitx = left_fit[0]*ploty**2 + left_fit[1]*ploty + left_fit[2]
right_fitx = right_fit[0]*ploty**2 + right_fit[1]*ploty + right_fit[2]
# Recast the x and y points into usable format for cv2.fillPoly()
pts_left = np.array([np.transpose(np.vstack([left_fitx, ploty]))])
pts_right = np.array([np.flipud(np.transpose(np.vstack([right_fitx, ploty])))])
pts = np.hstack((pts_left, pts_right))
# Draw the lane onto the warped blank image
cv2.fillPoly(color_warp, np.int_([pts]), (0,255, 0))
# Warp the blank back to original image space using inverse perspective matrix (Minv)
newwarp = cv2.warpPerspective(color_warp, self.LD.Minv, (img.shape[1], img.shape[0]))
return cv2.addWeighted(img, 1, newwarp, 0.3, 0)
def draw_lane_on_image(self, img):
'''
Inputs
img: input image
---
Returns
Find and draw the lane lines on the image `img`.
'''
left_fit, right_fit, _, _, _, _, _, _, _ = self.LD.detect_lanes(img)
output = self.draw_line(img, left_fit, right_fit)
return output
Dr = Drawer(LD)
images_with_drawings = [Dr.draw_lane_on_image(img) for img in sample_images_undist]
show_images(images_with_drawings, n_row=4, n_col=2, cmap='gray')
def calculate_lines_info(img):
"""
Find and draw the lane lines on the image `img`.
"""
# Gather fits
left_fit, right_fit, left_fit_curve, right_fit_curve, _, _, _, _, _ = LD.detect_lanes(img)
# Gather curvature
left_curvature, right_curvature = LD.get_curvature(yRange, left_fit_curve), LD.get_curvature(yRange, right_fit_curve)
# Calculate vehicle center
x_max = img.shape[1]*LD.xm_per_pix
y_max = img.shape[0]*LD.ym_per_pix
vehicle_center = x_max / 2
# Calculate delta between vehicle center and lane center
if left_fit is not None and right_fit is not None:
left_line_pos = left_fit_curve[0]*y_max**2 + left_fit_curve[1]*y_max + left_fit_curve[2]
right_line_pos = right_fit_curve[0]*y_max**2 + right_fit_curve[1]*y_max + right_fit_curve[2]
line_center = (left_line_pos + right_line_pos)/2
delta_from_center = vehicle_center - line_center
else:
delta_from_center = None
return [left_fit, right_fit, left_fit_curve, right_fit_curve, left_curvature, right_curvature, delta_from_center]
def display_info(img, lines_info, font = cv2.FONT_HERSHEY_SIMPLEX, font_scale=1, font_color = (255, 255, 255)):
left_fit, right_fit, _, _, left_curvature, right_curvature, delta_from_center = lines_info
output = Dr.draw_line(img, left_fit, right_fit)
# Adjust message
message = '{:.2f} m right'.format(delta_from_center) if delta_from_center > 0 else '{:.2f} m left'.format(-delta_from_center)
# Add info
cv2.putText(output, 'Left curvature: {:.0f} m'.format(left_curvature), (50, 50), font, font_scale, font_color, 2)
cv2.putText(output, 'Right curvature: {:.0f} m'.format(right_curvature), (50, 120), font, font_scale, font_color, 2)
cv2.putText(output, 'Vehicle is {} of center'.format(message), (50, 190), font, font_scale, font_color, 2)
return output
def pipeline(img):
lines_info = calculate_lines_info(img)
return display_info(img, lines_info)
pipeline_output = [pipeline(img) for img in sample_images_undist]
show_images(pipeline_output[-4:], n_row=2, n_col=2, cmap='gray')
compare_images([sample_images_undist[1], pipeline_output[1]], ['Original', 'Processed'])
class Line():
def __init__(self, frames_to_keep, robustness_params, label=None):
# Robustness params
self.params = robustness_params
# Name of the line
self.label = label
# was the line detected in the last iteration? >> DONE
self.detected = False
# polynomial coefficients for the most recent fit >> DONE
self.current_fit = [np.array([False])]
# polynomial coefficients for the most recent fit in real world metrics >> DONE
self.current_fit_curve = [np.array([False])]
# polynomial coefficients for the most recent fit history >> DONE
self.current_fit_history = []
# polynomial coefficients for the most recent fit change history >> DONE
self.current_fit_change_history = []
# boolean for updates to polynomial coefficients
self.current_fit_update_history = []
# radius of curvature of the line in some units >> DONE
self.radius_of_curvature = None
# radius of curvature of the line in some units history >> DONE
self.radius_of_curvature_history = []
# radius of curvature of the line in some units change history >> DONE
self.radius_of_curvature_change_history = []
# boolean for updates to radius of curvature of the line
self.radius_of_curvature_update_history = []
def check_value_curvature(self, curve_new):
self.radius_of_curvature_history.append(curve_new)
return curve_new < self.params["CURVATURE_RAW_CHECK"]
def check_change_curvature(self, curve_new):
## calculate absolute change
change = np.abs((self.radius_of_curvature - curve_new) / self.radius_of_curvature)
## store value
self.radius_of_curvature_change_history.append(change)
return change < self.params["CURVATURE_CHANGE_STABILITY"]
def check_value_fit(self, coef_new):
self.current_fit_history.append(coef_new)
bool = all([
np.abs(coef_new[0]) < self.params["CURVE_A_RAW_CHECK_A"],
np.abs(coef_new[1]) < self.params["CURVE_A_RAW_CHECK_B"],
np.abs(coef_new[2]) < self.params["CURVE_A_RAW_CHECK_C"]
])
return bool
def check_change_fit(self, coef_new):
## calculate absolute change
change = (self.current_fit - coef_new) / self.current_fit
## store values
self.current_fit_change_history.append(change)
bool = all([
np.abs(change[0]) < self.params["CURVE_CHANGE_STABILITY_A"],
np.abs(change[1]) < self.params["CURVE_CHANGE_STABILITY_B"],
np.abs(change[2]) < self.params["CURVE_CHANGE_STABILITY_C"]
])
return bool
robustness_params = {
# curvature raw value
"CURVATURE_RAW_CHECK":1000000,
# curvature change value
"CURVATURE_CHANGE_STABILITY":10,
# curve fit raw values
"CURVE_A_RAW_CHECK_A":0.0005,
"CURVE_A_RAW_CHECK_B":0.5,
"CURVE_A_RAW_CHECK_C":1500,
# curve fit change values
"CURVE_CHANGE_STABILITY_A":500,
"CURVE_CHANGE_STABILITY_B":100,
"CURVE_CHANGE_STABILITY_C":0.5,
}
class videoPipeline():
def __init__(self, robustness_params, frames_to_keep=10):
# initialize helpers class
self.CC = CameraCalibration(camera_path)
self.IP = ImageProcessing()
self.LD = LaneDetection(self.IP)
self.Dr = Drawer(self.LD)
# initialize lane lines
self.LEFT_LINE = Line(frames_to_keep, robustness_params, 'LEFT')
self.RIGHT_LINE = Line(frames_to_keep, robustness_params, 'RIGHT')
self.DELTA_FROM_CENTER = None
def calculate_lines_info(self, img):
"""
Find and draw the lane lines on the image `img`.
"""
# Gather fits
left_fit, right_fit, left_fit_curve, right_fit_curve, _, _, _, _, _ = self.LD.detect_lanes(img)
# Gather curvature
left_curvature, right_curvature = self.LD.get_curvature(self.LD.yRange, left_fit_curve), self.LD.get_curvature(self.LD.yRange, right_fit_curve)
# Calculate vehicle center
x_max = img.shape[1]*self.LD.xm_per_pix
y_max = img.shape[0]*self.LD.ym_per_pix
vehicle_center = x_max / 2
# Calculate delta between vehicle center and lane center
if left_fit is not None and right_fit is not None:
left_line_pos = left_fit_curve[0]*y_max**2 + left_fit_curve[1]*y_max + left_fit_curve[2]
right_line_pos = right_fit_curve[0]*y_max**2 + right_fit_curve[1]*y_max + right_fit_curve[2]
line_center = (left_line_pos + right_line_pos)/2
delta_from_center = vehicle_center - line_center
else:
delta_from_center = None
return [left_fit, right_fit, left_fit_curve, right_fit_curve, left_curvature, right_curvature, delta_from_center]
def display_info(self, img, lines_info, font = cv2.FONT_HERSHEY_SIMPLEX, font_scale=1, font_color = (255, 255, 255)):
left_fit, right_fit, _, _, left_curvature, right_curvature, delta_from_center = lines_info
output = self.Dr.draw_line(img, left_fit, right_fit)
# Adjust message
message = '{:.2f} m right'.format(delta_from_center) if delta_from_center > 0 else '{:.2f} m left'.format(-delta_from_center)
# Add info
cv2.putText(output, 'Left curvature: {:.0f} m'.format(left_curvature), (50, 50), font, font_scale, font_color, 2)
cv2.putText(output, 'Right curvature: {:.0f} m'.format(right_curvature), (50, 120), font, font_scale, font_color, 2)
cv2.putText(output, 'Vehicle is {} of center'.format(message), (50, 190), font, font_scale, font_color, 2)
return output
def smart_processing(self, img):
lines_info = self.calculate_lines_info(img)
left_fit, right_fit, left_fit_curve, right_fit_curve, left_curvature, right_curvature, delta_from_center = lines_info
### Detected ? ###
self.LEFT_LINE.detected = False if left_fit is None else True
self.RIGHT_LINE.detected = False if right_fit is None else True
## Sanity checks : LEFT ##
if self.LEFT_LINE.detected and self.LEFT_LINE.check_value_fit(left_fit):
# Lane detected >>
if (len(self.LEFT_LINE.current_fit_history) <=1) or (len(self.LEFT_LINE.current_fit_history) > 1 and self.LEFT_LINE.check_change_fit(left_fit)):
self.LEFT_LINE.current_fit = left_fit
self.LEFT_LINE.current_fit_curve = left_fit_curve
self.LEFT_LINE.current_fit_update_history.append("OK")
else:
# there is history but the change is too brutal
left_fit = self.LEFT_LINE.current_fit
left_fit_curve = self.LEFT_LINE.current_fit_curve
self.LEFT_LINE.current_fit_update_history.append("Brutal change.")
else:
# Line not detected >> old value
left_fit = self.LEFT_LINE.current_fit
left_fit_curve = self.LEFT_LINE.current_fit_curve
self.LEFT_LINE.current_fit_update_history.append("Value issue.")
if self.LEFT_LINE.detected and self.LEFT_LINE.check_value_curvature(left_curvature):
if (len(self.LEFT_LINE.radius_of_curvature_history) <= 1) or (len(self.LEFT_LINE.radius_of_curvature_history) > 1 and self.LEFT_LINE.check_change_curvature(left_curvature)):
self.LEFT_LINE.radius_of_curvature = left_curvature
self.LEFT_LINE.radius_of_curvature_update_history.append("OK")
else:
left_curvature = self.LEFT_LINE.radius_of_curvature
self.LEFT_LINE.radius_of_curvature_update_history.append("Brutal change.")
else:
left_curvature = self.LEFT_LINE.radius_of_curvature
self.LEFT_LINE.radius_of_curvature_update_history.append("Value issue.")
## Sanity checks : RIGHT ##
if self.RIGHT_LINE.detected and self.RIGHT_LINE.check_value_fit(right_fit):
if (len(self.RIGHT_LINE.current_fit_history) <=1) or (len(self.RIGHT_LINE.current_fit_history) > 1 and self.RIGHT_LINE.check_change_fit(right_fit)):
self.RIGHT_LINE.current_fit = right_fit
self.RIGHT_LINE.current_fit_curve = right_fit_curve
self.RIGHT_LINE.current_fit_update_history.append("OK")
else:
right_fit = self.RIGHT_LINE.current_fit
right_fit_curve = self.RIGHT_LINE.current_fit_curve
self.RIGHT_LINE.current_fit_update_history.append("Brutal change.")
else:
right_fit = self.RIGHT_LINE.current_fit
right_fit_curve = self.RIGHT_LINE.current_fit_curve
self.RIGHT_LINE.current_fit_update_history.append("Value issue.")
if self.RIGHT_LINE.detected and self.RIGHT_LINE.check_value_curvature(right_curvature):
if (len(self.RIGHT_LINE.radius_of_curvature_history) <= 1) or (len(self.RIGHT_LINE.radius_of_curvature_history) > 1 and self.RIGHT_LINE.check_change_curvature(right_curvature)):
self.RIGHT_LINE.radius_of_curvature = right_curvature
self.RIGHT_LINE.radius_of_curvature_update_history.append("OK")
else:
right_curvature = self.RIGHT_LINE.radius_of_curvature
self.RIGHT_LINE.radius_of_curvature_update_history.append("Brutal change.")
else:
right_curvature = self.RIGHT_LINE.radius_of_curvature
self.RIGHT_LINE.radius_of_curvature_update_history.append("Value issue.")
### Process DELTA_FROM_CENTER ###
if delta_from_center is None:
delta_from_center = self.DELTA_FROM_CENTER
else:
self.DELTA_FROM_CENTER = delta_from_center
safe_lines_info = left_fit, right_fit, left_fit_curve, right_fit_curve, left_curvature, right_curvature, delta_from_center
return self.display_info(img, safe_lines_info)
def run(self, input_video, output_video):
raw_clip = VideoFileClip(input_video)
processed_clip = raw_clip.fl_image(self.smart_processing)
processed_clip.write_videofile(output_video, audio=False)
vP = videoPipeline(robustness_params)
robustness_params
vP.run('project_video.mp4', 'project_video_output.mp4')
CURVATURE_RAW_CHECK = robustness_params["CURVATURE_RAW_CHECK"]
CURVATURE_CHANGE_STABILITY = robustness_params["CURVATURE_CHANGE_STABILITY"]
LEFT_LINE_stats = pd.Series(vP.LEFT_LINE.radius_of_curvature_history).describe()
plt.hist(vP.LEFT_LINE.radius_of_curvature_history, bins = np.arange(0, CURVATURE_RAW_CHECK, 10000))
plt.show()
plt.plot([item for item in vP.LEFT_LINE.radius_of_curvature_history if item < CURVATURE_RAW_CHECK])
plt.show()
RIGHT_LINE_stats = pd.Series(vP.LEFT_LINE.radius_of_curvature_history).describe()
plt.hist([item for item in vP.RIGHT_LINE.radius_of_curvature_history if item is not None], bins = np.arange(0, CURVATURE_RAW_CHECK, 10000))
plt.show()
plt.plot([item for item in vP.RIGHT_LINE.radius_of_curvature_history if (item is not None and item < CURVATURE_RAW_CHECK)])
plt.show()
plt.hist(vP.LEFT_LINE.radius_of_curvature_change_history, bins=list(np.arange(0.0, CURVATURE_CHANGE_STABILITY, 0.1)))
plt.show()
plt.plot([item for item in vP.LEFT_LINE.radius_of_curvature_change_history if item < CURVATURE_CHANGE_STABILITY])
plt.show()
plt.hist([item for item in vP.RIGHT_LINE.radius_of_curvature_change_history if item is not None], bins=list(np.arange(0.0, CURVATURE_CHANGE_STABILITY, 0.1)))
plt.show()
plt.plot([item for item in vP.RIGHT_LINE.radius_of_curvature_change_history if item < CURVATURE_CHANGE_STABILITY])
plt.show()
print(pd.Series(vP.LEFT_LINE.radius_of_curvature_update_history).value_counts())
print(pd.Series(vP.RIGHT_LINE.radius_of_curvature_update_history).value_counts())
def clean_curve_fit_history(list_raw):
new = [list(x) for x in list_raw if x is not None]
a_history = [x[0] for x in new]
b_history = [x[1] for x in new]
c_history = [x[2] for x in new]
return new, a_history, b_history, c_history
left_fit_history, left_a, left_b, left_c = clean_curve_fit_history(vP.LEFT_LINE.current_fit_history)
right_fit_history, right_a, right_b, right_c = clean_curve_fit_history(vP.RIGHT_LINE.current_fit_history)
left_fit_change_history, left_change_a, left_change_b, left_change_c = clean_curve_fit_history(vP.LEFT_LINE.current_fit_change_history)
right_fit_change_history, right_change_a, right_change_b, right_change_c = clean_curve_fit_history(vP.RIGHT_LINE.current_fit_change_history)
CURVE_A_RAW_CHECK_A = robustness_params["CURVE_A_RAW_CHECK_A"]
CURVE_A_RAW_CHECK_B = robustness_params["CURVE_A_RAW_CHECK_B"]
CURVE_A_RAW_CHECK_C = robustness_params["CURVE_A_RAW_CHECK_C"]
CURVE_CHANGE_STABILITY_A = robustness_params["CURVE_CHANGE_STABILITY_A"]
CURVE_CHANGE_STABILITY_B = robustness_params["CURVE_CHANGE_STABILITY_B"]
CURVE_CHANGE_STABILITY_C = robustness_params["CURVE_CHANGE_STABILITY_C"]
Raw values
pd.DataFrame.from_items([('right_a', left_a), ('right_b', left_b), ('right_c', left_c)]).describe()
show_plots([left_a, left_b, left_c], n_row=1, n_col=3, figsize=(15, 5), title_name='Curve History')
Change
show_plots([left_change_a, left_change_b, left_change_c], n_row=1, n_col=3, figsize=(15, 5), title_name='Left Curve Change History')
pd.DataFrame.from_items([('right_a', right_a), ('right_b', right_b), ('right_c', right_c)]).describe()
Raw values
show_plots([right_a, right_b, right_c], n_row=1, n_col=3, figsize=(15, 5), title_name='Curve History')
Change
show_plots([right_change_a, right_change_b, right_change_c], n_row=1, n_col=3, figsize=(15, 5), title_name='Right Curve Change Change History')
print(pd.Series(vP.LEFT_LINE.current_fit_update_history).value_counts())
print(pd.Series(vP.RIGHT_LINE.current_fit_update_history).value_counts())
np.sum([x > 0.5 for x in right_change_c])
robustness_params
print(np.sum([np.abs(x) > 500 for x in left_change_a]))
print(np.sum([np.abs(x) > 500 for x in right_change_a]))
print(np.sum([np.abs(x) > 20 for x in left_change_b]))
print(np.sum([np.abs(x) > 20 for x in right_change_b]))